{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# PySpark With Hive\n",
    "\n",
    "In this notebook we'll cover how you can read/write to Hive using SparkSQL, this notebook assumes that you have enabled the service \"Hive\" in your project"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a SparkSession with Hive Enabled\n",
    "\n",
    "sparkmagic automatically creates a spark session in the cluster for us with Hive enabled"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>0</td><td>application_1540813611542_0002</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1540813611542_0002/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1540813611542_0002_01_000001/test__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n",
      "<pyspark.sql.session.SparkSession object at 0x7f183f464860>"
     ]
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Select Hive Database\n",
    "\n",
    "Using the spark session you can interact with Hive through the `sql` method on the sparkSession, or through auxillary methods likes `.select()` and `.where()`. \n",
    "\n",
    "Each project that have enabled Hive will automatically have a Hive database created for them, this is the only Hive database that you can access unless someone have shared their database with you."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import hdfs as hopsfs\n",
    "PROJECT_NAME = hopsfs.project_name()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'test'"
     ]
    }
   ],
   "source": [
    "PROJECT_NAME"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DataFrame[]"
     ]
    }
   ],
   "source": [
    "spark.sql(\"use \" + PROJECT_NAME)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Tables\n",
    "\n",
    "Tables can be created either by issuing a `CREATE TABLE` statement or by using the `saveAsTable()` method on an existing dataframe. When using `saveAsTable` spark will infer the schema from the dataframe and do the `CREATE TABLE` for you. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+---------+-----------+\n",
      "|database|tableName|isTemporary|\n",
      "+--------+---------+-----------+\n",
      "+--------+---------+-----------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"show tables\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DataFrame[]"
     ]
    }
   ],
   "source": [
    "spark.sql(\"CREATE TABLE MAGIC_MATRIX (position int, value float) STORED AS ORC\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+------------+-----------+\n",
      "|database|   tableName|isTemporary|\n",
      "+--------+------------+-----------+\n",
      "|    test|magic_matrix|      false|\n",
      "+--------+------------+-----------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"show tables\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import *\n",
    "schema = StructType([StructField('SquaredValue', IntegerType(), True)])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SQLContext\n",
    "sqlContext = SQLContext(spark.sparkContext)\n",
    "rddValues = spark.sparkContext.parallelize(list(range(0,100))).map(lambda x: [x*x])\n",
    "dfValues = sqlContext.createDataFrame(rddValues,schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+\n",
      "|SquaredValue|\n",
      "+------------+\n",
      "|           0|\n",
      "|           1|\n",
      "|           4|\n",
      "|           9|\n",
      "|          16|\n",
      "+------------+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "dfValues.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfValues.write.format(\"ORC\").mode(\"overwrite\").saveAsTable(\"SquaredValues\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+-------------+-----------+\n",
      "|database|    tableName|isTemporary|\n",
      "+--------+-------------+-----------+\n",
      "|    test| magic_matrix|      false|\n",
      "|    test|squaredvalues|      false|\n",
      "+--------+-------------+-----------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"show tables\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Insert Values\n",
    "\n",
    "Values can be inserted with plain SQL or by using `saveAsTable` / `insertInto`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Simple Insert"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DataFrame[]"
     ]
    }
   ],
   "source": [
    "spark.sql(\"INSERT INTO TABLE magic_matrix VALUES (1, 99), (2, 100)\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+-----+\n",
      "|position|value|\n",
      "+--------+-----+\n",
      "|       1| 99.0|\n",
      "|       2|100.0|\n",
      "+--------+-----+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT * FROM magic_matrix\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Insert  with saveAsTable"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "rddValues2 = spark.sparkContext.parallelize(list(range(100,200))).map(lambda x: [x*x])\n",
    "dfValues2 = sqlContext.createDataFrame(rddValues2,schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|     100|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfValues2.write.format(\"ORC\").mode(\"append\").saveAsTable(\"squaredvalues\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|     200|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Insert with insertInto"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfValues2.write.mode(\"append\").insertInto(\"squaredvalues\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|     300|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can also use overwrite mode:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfValues2.write.format(\"ORC\").mode(\"overwrite\").saveAsTable(\"squaredvalues\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|     100|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"REFRESH TABLE squaredvalues\")\n",
    "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Insert by using TempTable"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "rddValues3 = spark.sparkContext.parallelize(list(range(200,300))).map(lambda x: [x*x])\n",
    "dfValues3 = sqlContext.createDataFrame(rddValues3,schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|     200|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "dfValues3.registerTempTable(\"temptable\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DataFrame[]"
     ]
    }
   ],
   "source": [
    "sqlContext.sql(\"insert into table squaredvalues select * from temptable\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|     300|\n",
      "+--------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Queries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+\n",
      "|SquaredValue|\n",
      "+------------+\n",
      "|       40000|\n",
      "|       40401|\n",
      "|       40804|\n",
      "|       41209|\n",
      "|       41616|\n",
      "|       42025|\n",
      "|       42436|\n",
      "|       42849|\n",
      "|       43264|\n",
      "|       43681|\n",
      "|       44100|\n",
      "|       44521|\n",
      "|       44944|\n",
      "|       45369|\n",
      "|       45796|\n",
      "|       46225|\n",
      "|       46656|\n",
      "|       47089|\n",
      "|       47524|\n",
      "|       47961|\n",
      "+------------+\n",
      "only showing top 20 rows"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT * FROM squaredvalues WHERE squaredvalue > 380 \").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+-----+\n",
      "|position|value|\n",
      "+--------+-----+\n",
      "|       2|100.0|\n",
      "+--------+-----+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT * FROM magic_matrix WHERE position = 2 \").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Drop Tables"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------+-------------+-----------+\n",
      "|      database|    tableName|isTemporary|\n",
      "+--------------+-------------+-----------+\n",
      "|sparksqlonhive| magic_matrix|      false|\n",
      "|sparksqlonhive|squaredvalues|      false|\n",
      "|              |    temptable|       true|\n",
      "+--------------+-------------+-----------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SHOW TABLES\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DataFrame[]"
     ]
    }
   ],
   "source": [
    "spark.sql(\"DROP TABLE magic_matrix\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------+-------------+-----------+\n",
      "|      database|    tableName|isTemporary|\n",
      "+--------------+-------------+-----------+\n",
      "|sparksqlonhive|squaredvalues|      false|\n",
      "|              |    temptable|       true|\n",
      "+--------------+-------------+-----------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SHOW TABLES\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DataFrame[]"
     ]
    }
   ],
   "source": [
    "spark.sql(\"DROP TABLE squaredvalues\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+---------+-----------+\n",
      "|database|tableName|isTemporary|\n",
      "+--------+---------+-----------+\n",
      "|        |temptable|       true|\n",
      "+--------+---------+-----------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SHOW TABLES\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.catalog.dropTempView(\"temptable\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+---------+-----------+\n",
      "|database|tableName|isTemporary|\n",
      "+--------+---------+-----------+\n",
      "+--------+---------+-----------+"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SHOW TABLES\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
